package cn.edu360.report

import cn.edu360.beans.ReportLogDataAnalysis
import cn.edu360.utils.{ConfigHandler, MySQLHandler}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 统计日志文件中各省市的数据分布情况
  * 第二种方式：
  * Core方式
  * sheep.Old @ 64341393
  * Created 2018/5/9
  */
object LogDataAnalysisCore {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("统计日志文件中各省市的数据分布情况")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)
    // 读取数据
    val rawDataFrame = sQLContext.read.parquet(ConfigHandler.parquetPath)
    val result: RDD[((String, String), Int)] = rawDataFrame.map(t => {
      val pname = t.getAs[String]("provincename")
      val cname = t.getAs[String]("cityname")
      ((pname, cname), 1)
    }).reduceByKey(_ + _)
    import sQLContext.implicits._
    val resultDF: DataFrame = result.map(t => ReportLogDataAnalysis(t._2, t._1._1, t._1._2)).toDF()
    //将结果写出到本地
    resultDF.coalesce(1).write.json(ConfigHandler.logdataAnalysisResultJsonPath)
    // 将结果写出到MySQL
    //resultDF.write.mode(SaveMode.Overwrite).jdbc(ConfigHandler.url,ConfigHandler.logDataAnalysisTableName,ConfigHandler.props)
    MySQLHandler.save2db(resultDF, ConfigHandler.logDataAnalysisTableName)
    sc.stop()
  }
}
